-
Notifications
You must be signed in to change notification settings - Fork 92
feat: add support for batch execution in parallel with custom Executor #1900
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add support for batch execution in parallel with custom Executor #1900
Conversation
...ols-batch/src/main/java/software/amazon/lambda/powertools/batch/internal/MultiThreadMDC.java
Show resolved
Hide resolved
@phipag sonar is failing due to duplication in Kinesis and DynamoDB handlers. The same duplication exists for the already existing method as well. As a result, I followed the same pattern for the new method too. If you want, we can move the duplicated block for both (older and newer) methods to a common static utlilty method but IMO this duplication is ok. Let me know. |
@phipag could you please unblock this during this iteration? I think there's some pending feedback we need to give to the contributor. Thanks! |
return handler.processBatchInParallel(kinesisEvent, context, executor); | ||
} | ||
|
||
private void processMessage(Product p, Context c) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can remove Context c
here please. There is an overload for a message handler without context in AbstractBatchMessageHandlerBuilder.java
/**
* Builds a BatchMessageHandler that can be used to process batches, given
* a user-defined handler to process each item in the batch. This variant
* takes a function that consumes the deserialized body of the given message
* If deserialization fails, it will be treated as
* failure of the processing of that item in the batch.
* Note: If you don't need the Lambda context, use the variant of this function
* that does not require it.
*
* @param handler Processes the deserialized body of the message
* @return A BatchMessageHandler for processing the batch
*/
public <M> BatchMessageHandler<E, R> buildWithMessageHandler(Consumer<M> handler, Class<M> messageClass) {
return buildWithMessageHandler((f, c) -> handler.accept(f), messageClass);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return handler.processBatchInParallel(ddbEvent, context, executor); | ||
} | ||
|
||
private void processMessage(DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord, Context context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can remove Context c
here please. There is an overload for a message handler without context in AbstractBatchMessageHandlerBuilder.java
/**
* Builds a BatchMessageHandler that can be used to process batches, given
* a user-defined handler to process each item in the batch. This variant
* takes a function that consumes a raw message and the Lambda context. This
* is useful for handlers that need access to the entire message object, not
* just the deserialized contents of the body.
*
* @param handler Takes a raw message - the underlying AWS Events Library event - to process.
* For instance for SQS this would be an SQSMessage.
* @return A BatchMessageHandler for processing the batch
*/
public BatchMessageHandler<E, R> buildWithRawMessageHandler(Consumer<T> handler) {
return buildWithRawMessageHandler((f, c) -> handler.accept(f));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The older examples in the package follow the same pattern as the new one. Do you want those changed as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this would be awesome if you can update those as well. Ideally, we have no Sonar or pmd_analyze findings before merging the PR. With the exception of the code duplication finding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.../java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java
Show resolved
Hide resolved
...ols-batch/src/main/java/software/amazon/lambda/powertools/batch/internal/MultiThreadMDC.java
Show resolved
Hide resolved
How come did the code duplication increase so much? Is this old code that is now being scanned, or new code? Can we do anything to improve it? |
Hey @visheshruparelia, I finally reviewed your PR and it works great. I just tested it end-to-end using the examples you also added. Thanks for this! 🚀 Can you please have a look at the pmd_analyse comments in the diff and address them? I added comments to some but there are more such as:
I think we can just catch Regarding the code duplication reported by Sonarcube, I think it is okay in this case since it is actually easier to read for developers than creating more abstractions in addition to the existing ones. One last thing is the documentation page here. Do you mind documenting the new feature that you implemented there? You could add a second example showing how to pass a custom executor to this code snippet as a second tab: |
The code duplication was there before and triggered again as a finding in this PR due to implementing the new method overload for @Override
public StreamsEventResponse processBatchInParallel(DynamodbEvent event, Context context, Executor executor) {
MultiThreadMDC multiThreadMDC = new MultiThreadMDC();
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
List<CompletableFuture<Void>> futures = event.getRecords().stream()
.map(eventRecord -> CompletableFuture.runAsync(() -> {
multiThreadMDC.copyMDCToThread(Thread.currentThread().getName());
Optional<StreamsEventResponse.BatchItemFailure> failureOpt = processBatchItem(eventRecord, context);
failureOpt.ifPresent(batchItemFailures::add);
multiThreadMDC.removeThread(Thread.currentThread().getName());
}, executor))
.collect(Collectors.toList());
futures.forEach(CompletableFuture::join);
return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build();
} This code snippet is essentially the same for DynamoDB and Kinesis (code duplication reported here) – but it is different for SQS because we need to check if we are dealing with a FIFO queue. It would require some larger refactoring if we wanted to remove the code duplication completely. My personal opinion is, that we should favor the current code duplication because:
If we think that we should refactor though, I am also okay with that. Let me know what you folks think @dreamorosi and @visheshruparelia . |
Sounds sensible, thanks for explaining. |
Hi @phipag, thank you for the review.
|
|
Thanks very much for your update @visheshruparelia! Can you just commit the suggestion that I made to highlight the correct lines in your new doc example? After this, we are ready to merge 🚀 |
Co-authored-by: Philipp Page <[email protected]>
@phipag done |
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. LGTM 🥳 .
Congrats on your first contribution @visheshruparelia !!
#1864
Description of changes:
Add a new method in the BatchHandler class which allows users to pass their custom Executor for processing messages in parallel.
Checklist
Breaking change checklist
RFC issue #:
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.